Welcome to Apache Spark with Scala

Our goal with this notebook is to explore the Titanic dataset and train two classifiers that will let us determine if a certain passenger was likely to survive or not based on his/her characteristics (ex. age, gender, class). You can run this locally but one can use the same code to run in distributed fashion as we are using the RDD abstraction from Apache Spark.

Some resources that we used to compose this notebook:

Spark 1.4.1 API

Decision Trees documentation

Logistic Regression documentation

1. Explore data

With the IBM Spark Kernel we already have the default spark context (ex. sc), so let's start by loading the file and inspecting our data.


In [1]:
val rawRdd = sc.textFile("datasets/COUNT/titanic.csv")

In [2]:
rawRdd.count()


Out[2]:
1317

In [3]:
rawRdd.take(5)


Out[3]:
Array("","class","age","sex","survived", "1","1st class","adults","man","yes", "2","1st class","adults","man","yes", "3","1st class","adults","man","yes", "4","1st class","adults","man","yes")

Our dataset as we see above contains a line with the header. Let's get rid of it and leave only the data points.


In [4]:
val header = rawRdd.first()
val dataRdd = rawRdd.filter( _ != header)

Let's verify that our new dataset no longer has the header.


In [5]:
dataRdd.first()


Out[5]:
"1","1st class","adults","man","yes"

We can see other data points at random. Try replacing the last parameter 0L to 3L, this is just a seed.


In [6]:
dataRdd.takeSample(false, 5, 0L)


Out[6]:
Array("1222","3rd class","adults","women","no", "212","1st class","adults","women","yes", "284","1st class","adults","women","yes", "249","1st class","adults","women","yes", "1041","3rd class","adults","man","no")

2. Prepare data

So, in order to play with machine learning models we need a numerical representation of our data. Thus we need to translate our data points to feature vectors, you can think of this as just a list of numbers, where every number is a feature or an encoding of the data. Let's first process our data, keep in mind that up to here our dataRdd object has each record as a string, so we need to split that into "columns"


In [7]:
val rowsRdd = dataRdd.map(line => line.split(",").map(_.trim))

In [8]:
rowsRdd.take(2)

Let's create a function to convert data points to feature vectors. We need to feed LabeledPoint objects to our machine learning models.


In [9]:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors

def toVector( row : Array[String] ) : LabeledPoint = {
    val klass = row(1).charAt(1)-'0'.toDouble-1
    val age = if (row(2).contains("adults")) 1 else 0
    val sex = if (row(3).contains("women")) 1 else 0
    val survived = if (row(4).contains("yes")) 1 else 0
    LabeledPoint(survived, Vectors.dense(klass,age,sex))
}

We apply our define function to every row we have


In [10]:
val vectorsRdd = rowsRdd.map(row => toVector(row))

Now we can check that our feature vectors were created correctly. Refer to our toVector function for the mapping.

Try reading. The first instance would read as: A person in "3rd class" who was an "adult" and a "woman" did "not survive" (0.0,[2.0.1.0,1.0])


In [11]:
vectorsRdd.takeSample(false, 5, 0)


Out[11]:
Array((0.0,[2.0,1.0,1.0]), (1.0,[0.0,1.0,1.0]), (1.0,[0.0,1.0,1.0]), (1.0,[0.0,1.0,1.0]), (0.0,[2.0,1.0,0.0]))

Finally, let's split the data, allocating 70% for training and 30% for testing.


In [12]:
val splits = vectorsRdd.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

3. Train a Decision tree model

We will start by training a decision tree model which is popular these days, but there are cases where the structure of the data might benefit from other supervised algorithms. For our case, we specify that numClasses is 2 as we are concerned with either a survived (1.0) or not survived (0.0) prediction. Also for categoricalFeaturesInfo we specify for each feature how many outcomes we can have. Thus, for the first Map, we say that for our feature 0, we can have 3 different outcomes (ex. 1st, 2nd or 3rd class).


In [13]:
import org.apache.spark.mllib.tree.DecisionTree

val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]((0,3), (1,2), (2,2))
val impurity = "gini"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
                                         impurity, maxDepth, maxBins)

Now that we train, let's predict


In [14]:
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Notice that we used only the features to predict. And below let's compute the error rate. You can see that first level if-else, is based on sex, and for many of the inner branches looks like if you were a women our model is more likely to predict 1.0 (survived).


In [15]:
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / testData.count()
println("Test Error = " + testErr)
println("Learned classification tree model:\n" + model.toDebugString)


Test Error = 0.21119592875318066
Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 19 nodes
  If (feature 2 in {0.0})
   If (feature 1 in {0.0})
    If (feature 0 in {2.0})
     Predict: 0.0
    Else (feature 0 not in {2.0})
     Predict: 1.0
   Else (feature 1 not in {0.0})
    If (feature 0 in {0.0})
     Predict: 0.0
    Else (feature 0 not in {0.0})
     If (feature 0 in {1.0})
      Predict: 0.0
     Else (feature 0 not in {1.0})
      Predict: 0.0
  Else (feature 2 not in {0.0})
   If (feature 0 in {0.0,1.0})
    If (feature 0 in {0.0})
     Predict: 1.0
    Else (feature 0 not in {0.0})
     If (feature 1 in {0.0})
      Predict: 1.0
     Else (feature 1 not in {0.0})
      Predict: 1.0
   Else (feature 0 not in {0.0,1.0})
    If (feature 1 in {1.0})
     Predict: 0.0
    Else (feature 1 not in {1.0})
     Predict: 1.0

4. Play with model

Now let's have some fun, we can create random scenarios to see how our model would classify that data point. Remember we have to talk to our model using LabeledPoint objects. So, we need to create instances using the following:

LabeledPoint(survived, Vectors.dense(klass,age,sex))

First 3 test passengers are men in 1st, 2nd and 3rd class. The last one is a girl.


In [36]:
val testPassenger1 = LabeledPoint(0.0, Vectors.dense(0.0,1,0,0.0))
val testPassenger2 = LabeledPoint(0.0, Vectors.dense(1.0,1,0,0.0))
val testPassenger3 = LabeledPoint(0.0, Vectors.dense(2.0,1,0,0.0))
val testPassenger4 = LabeledPoint(1.0, Vectors.dense(0.0,0,0,1.0))

In [37]:
println(model.predict(testPassenger4.features))


1.0

Seems that if you were a girl in 1st class our model says you were likely to survive. On the other hand, if you were a man regardless of the class your chances were not that good.

5. Train a linear regression model

Now let's train another model using our same trainingData object we created at the end of Section 2 for comparison purposes


In [32]:
import org.apache.spark.mllib.regression.LinearRegressionWithSGD

val numIterations = 100
val linearRegressionModel = LinearRegressionWithSGD.train(trainingData, numIterations)

In [34]:
// Compute raw scores on the test set.
val scoreAndLabels = testData.map { point =>
  val score = linearRegressionModel.predict(point.features)
  (score, point.label)
}

In [36]:
// Get evaluation metrics.
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println("Area under ROC = " + auROC)


Area under ROC = 0.7938241963452048

Notice we got ROC value of 79%, ROC is just another way to calculate precision of the model.